//import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;



public class GroupCombineonaGroupedDataSet
{

    public static void main(String[] args) throws Exception
    {



        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        String rootPath = new StringBuilder(System.getProperty("user.dir")).append("/dataset_api/Java").toString();
//        DataSet<Tuple1<String>> input = env.readCsvFile("file:///"+rootPath+"/"+"mapper.csv").types(String.class); // The words received as input
        DataSet<String>input=env.readTextFile("file:///"+rootPath+"/"+"mapper.csv");




        DataSet<Tuple2<String, Integer>> combinedWords = input
//                .groupBy(0) // group identical words
                .combineGroup(new GroupCombineFunction<String, Tuple2<String, Integer>>()
                {
                    @Override
                    public void combine(Iterable<String> words, Collector<Tuple2<String, Integer>> out) throws Exception {
                        String key = null;
                        int count = 0;

                        for (String word : words)
                        {
                            key = word;
                            count++;
                        }
                        out.collect(new Tuple2<>(key, count));

                        // emit tuple with word and count
                    }
                });

        System.out.println("-----------------------输出combinedWords-------------------------------");
        combinedWords.print();





        DataSet<Tuple2<String, Integer>> output = combinedWords.groupBy(0).reduceGroup(new GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>()
        {
            @Override
            public void reduce(Iterable<Tuple2<String, Integer>> words, Collector<Tuple2<String, Integer>> out ) throws Exception
            {
                String key = null;
                int count = 0;
                for (Tuple2<String, Integer> word : words)
                {
                    key=word.f0;
                    count++;
                }

                out.collect(new Tuple2<>(key, count));

            }
        });


        System.out.println("-----------------------输出output------------------------------");
        output.print();


    }


}









